{
  "metadata" : {
    "name" : "Classifying Genomes using Adam with RF",
    "user_save_timestamp" : "1970-01-01T00:00:00.000Z",
    "auto_save_timestamp" : "1970-01-01T00:00:00.000Z",
    "language_info" : {
      "name" : "scala",
      "file_extension" : "scala",
      "codemirror_mode" : "text/x-scala"
    },
    "trusted" : true,
    "customLocalRepo" : null,
    "customRepos" : null,
    "customDeps" : null,
    "customImports" : null,
    "customArgs" : null,
    "customSparkConf" : null
  },
  "cells" : [ {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Set the local dependencies repository"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : ":local-repo /tmp/spark-notebook/repo",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### ADAM dependencies for the notebook"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : ":dp org.bdgenomics.adam % adam-core % 0.16.0\n- org.apache.hadoop % hadoop-client %   _\n- org.apache.spark  % spark-core    %   _\n- org.scala-lang    %     _         %   _\n- org.scoverage     %     _         %   _\n+ org.apache.spark  %  spark-mllib_2.10  % 1.3.1",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Get the master hostname, define a function to build hdfs urls and reset the spark context with appropriate configuration"
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Get the master hostname and define a function to build hdfs urls"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import sys.process._\nval master = (\"ec2-metadata --public-hostname\"!!).drop(\"public-hostname: \".size).mkString.trim\ndef hu(s:String) = s\"hdfs://$master:9010/temp/$s\"",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Set the AWS keys to get genotypes from S3"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val fs_s3_awsAccessKeyId      = sys.env.get(\"AWS_ACCESS_KEY_ID\").getOrElse(\"<hard-code-one>\")\nval fs_s3_awsSecretAccessKey  = sys.env.get(\"AWS_SECRET_ACCESS_KEY\").getOrElse(\"<hard-code-one>\")",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Reset the spark context with complete configuration"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "reset(lastChanges = _.set(\"spark.serializer\", \"org.apache.spark.serializer.KryoSerializer\")\n                     .set(\"spark.kryo.registrator\", \"org.bdgenomics.adam.serialization.ADAMKryoRegistrator\")\n                     .set(\"spark.kryoserializer.buffer.mb\", \"4\")\n                     .set(\"spark.kryo.referenceTracking\", \"true\")\n                     .setMaster(s\"spark://$master:7077\")\n                     .setAppName(\"ADAM 1000genomes\")\n                     .set(\"spark.executor.memory\", \"13g\")\n                     .set(\"fs.s3.awsAccessKeyId\", fs_s3_awsAccessKeyId)\n                     .set(\"fs.s3.awsSecretAccessKey\", fs_s3_awsSecretAccessKey)\n)",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Import ADAM and other classes"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import org.apache.hadoop.fs.{FileSystem, Path}\n\n//import org.bdgenomics.adam.converters.{ VCFLine, VCFLineConverter, VCFLineParser }\nimport org.bdgenomics.formats.avro.{Genotype, FlatGenotype}\nimport org.bdgenomics.adam.models.VariantContext\nimport org.bdgenomics.adam.rdd.ADAMContext._\n//import org.bdgenomics.adam.rdd.variation.VariationContext._\nimport org.bdgenomics.adam.rdd.ADAMContext\n  \nimport org.apache.spark.rdd.RDD",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true
    },
    "cell_type" : "markdown",
    "source" : "### Genotypes\nCreate a Genotype RDD from an ADAM formatted 1000genomes chromosome (22 here) stored on s3"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val adamFileOnS3 = \"s3n://med-at-scale/1000genomes/ALL.chr22.integrated_phase1_v3.20101123.snps_indels_svs.genotypes.vcf.adam\"",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val gts:RDD[Genotype] = sparkContext.adamLoad(adamFileOnS3)",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Make a copy on hdfs if required to speed up a bit..."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "gts.adamParquetSave(hu(\"chr22.adam\"))",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val gts:RDD[Genotype] = sparkContext.adamLoad(hu(\"chr22.adam\"))\ngts.cache()",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Filter variants with position in 16,000,000-17,000,000"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val start = 16000000\nval end   = 26000000\nval sampledGts = gts.filter(g => (g.getVariant.getStart >= start && g.getVariant.getEnd <= end) )",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Save a local copy of the sample to speedup processing"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "sampledGts.adamParquetSave(hu(\"chr22-sample-10M.adam\"))",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val sampledGts:RDD[Genotype] = sparkContext.adamLoad(hu(\"chr22-sample-10M.adam\"))",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true
    },
    "cell_type" : "markdown",
    "source" : "### Population: List of populations selected for analysis"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "// populations to select\nval pops = Set(\"GBR\", \"ASW\", \"CHB\")\nval popsList = pops.toList\nval popsIdx = popsList.zipWithIndex.toMap",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "We download on the locafilesystem the list of samples IDs and correspondig populations"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val panelFile = \"/vol0/data/ALL.panel\"",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "s\"wget ftp://ftp.1000genomes.ebi.ac.uk/vol1/ftp/release/20130502/integrated_call_samples_v3.20130502.ALL.panel -O ${panelFile}\"!!",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true
    },
    "cell_type" : "markdown",
    "source" : "### Panel"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import scala.io.Source\ndef extract(filter: (String, String) => Boolean= (s, t) => true) = Source.fromFile(panelFile).getLines().map( line => {\n  val toks = line.split(\"\\t\").toList\n  toks(0) -> toks(1)\n}).toMap.filter( tup => filter(tup._1, tup._2) )\n  \n// panel extract from file, filtering by the 2 populations\ndef panel: Map[String,String] = \n  extract((sampleID: String, pop: String) => pops.contains(pop)) \n  \n// broadcast the panel \nval bPanel = sparkContext.broadcast(panel)",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Filter genotypes further on selected populations"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val finalGts = sampledGts.filter(g =>  bPanel.value.contains(g.getSampleId))",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true
    },
    "cell_type" : "markdown",
    "source" : "### Count samples"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val sampleCount = finalGts.map(_.getSampleId.toString.hashCode).distinct.count\ns\"#Samples: $sampleCount\"",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true
    },
    "cell_type" : "markdown",
    "source" : "### Ensuring completness"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "//utils\nimport scala.collection.JavaConverters._\nimport org.bdgenomics.formats.avro._\ndef variantId(g:Genotype):String = {\n  val name = g.getVariant.getContig.getContigName\n    val start = g.getVariant.getStart\n    val end = g.getVariant.getEnd\n    s\"$name:$start:$end\"\n}\ndef asDouble(g:Genotype):Double = g.getAlleles.asScala.count(_ != GenotypeAllele.Ref)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark.SparkContext._\n// A VARIANT SHOULD HAVE sampleCount GENOTYPES\nval variantsById = finalGts.keyBy(g => variantId(g).hashCode).groupByKey.cache\nval missingVariantsRDD = variantsById.filter { case (k, it) => it.size != sampleCount }.keys\n\n// IF SAVING LIST OF MISSING VARIANTS IS REQUIRED...\n//missingVariantsRDD.saveAsObjectFile(hu(\"/model/missing-variants\"))\n\n// could be broadcased as well...\nval missingVariants = missingVariantsRDD.collect().toSet",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "## Now compute KMeans and use distances to Center as features"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "// list samples\nval sampleList = completeGts.map{ g => (g.getSampleId.toString) }.distinct.collect\nval sampleIdx = sampleList.zipWithIndex.map(tup => (tup._1, tup._2.toLong)).toMap\n// broadcast the samples list \nval bSampleIdx  = sparkContext.broadcast(sampleIdx)\nval bSampleList = sparkContext.broadcast(sampleList)",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#### Build the complete dataframe"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark.mllib.linalg.{Vector=>MLVector, Vectors}\ndef makeSortedVector(gts: Iterable[(Double, Int)]): MLVector = Vectors.dense( gts.toArray.sortBy(_._2).map(_._1) )\n\nval dataPerSampleId:RDD[(String, MLVector)] =\n    groupedSampleToData.mapValues { it =>\n        makeSortedVector(it)\n    }\nval dataFrame:RDD[MLVector] = dataPerSampleId.values",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Split Training / Test sets (70/30)"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val splits = dataPerSampleId.randomSplit(Array(0.7, 0.3), seed = 111234545L)\n\nval trainingPerSampleId = splits(0)\nval testPerSampleId = splits(1)\n\nval training = trainingPerSampleId.values\nval test = trainingPerSampleId.values",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#### Train the KMeans model with 25 clusters on the Training set"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark.mllib.clustering.{KMeans,KMeansModel}\nimport org.apache.spark.mllib.clustering.VectorWithNorm\nval model: KMeansModel = KMeans.train(training, 25, 20)",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#### hmm norm, dot and distance..."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark.mllib.linalg.SparseVector\nval EPSILON = {\n   var eps = 1.0\n   while ((1.0 + (eps / 2.0)) != 1.0) {\n     eps /= 2.0\n   }\n   eps\n }                                    \ndef dot(v1: MLVector, v2: MLVector): Double = v1.toArray.zip(v2.toArray).map(t => t._1*t._2).reduce(_+_)\n\ndef fastSquaredDistance(\n      v1: MLVector,\n      norm1: Double,\n      v2: MLVector,\n      norm2: Double,\n      precision: Double = 1e-6): Double = {\n    val n = v1.size\n    require(v2.size == n)\n    require(norm1 >= 0.0 && norm2 >= 0.0)\n    val sumSquaredNorm = norm1 * norm1 + norm2 * norm2\n    val normDiff = norm1 - norm2\n    var sqDist = 0.0\n    /*\n     * The relative error is\n     * <pre>\n     * EPSILON * ( \\|a\\|_2^2 + \\|b\\\\_2^2 + 2 |a^T b|) / ( \\|a - b\\|_2^2 ),\n     * </pre>\n     * which is bounded by\n     * <pre>\n     * 2.0 * EPSILON * ( \\|a\\|_2^2 + \\|b\\|_2^2 ) / ( (\\|a\\|_2 - \\|b\\|_2)^2 ).\n     * </pre>\n     * The bound doesn't need the inner product, so we can use it as a sufficient condition to\n     * check quickly whether the inner product approach is accurate.\n     */\n    val precisionBound1 = 2.0 * EPSILON * sumSquaredNorm / (normDiff * normDiff + EPSILON)\n    if (precisionBound1 < precision) {\n      sqDist = sumSquaredNorm - 2.0 * dot(v1, v2)\n    } else if (v1.isInstanceOf[SparseVector] || v2.isInstanceOf[SparseVector]) {\n      val dotValue = dot(v1, v2)\n      sqDist = math.max(sumSquaredNorm - 2.0 * dotValue, 0.0)\n      val precisionBound2 = EPSILON * (sumSquaredNorm + 2.0 * math.abs(dotValue)) /\n        (sqDist + EPSILON)\n      if (precisionBound2 > precision) {\n        sqDist = Vectors.sqdist(v1, v2)\n      }\n    } else {\n      sqDist = Vectors.sqdist(v1, v2)\n    }\n    sqDist\n  }",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#### Now compute the distances to centroids and use as new features"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark.mllib.regression.LabeledPoint",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val trainingDists = trainingPerSampleId.map{ rr =>\n  val dists = model.clusterCenters.map{ x =>\n          fastSquaredDistance(x, Vectors.norm(x, 2.0), rr._2, Vectors.norm(rr._2, 2.0))\n                        }\n  new LabeledPoint(popsIdx(bPanel.value(rr._1)).toDouble, Vectors.dense(dists))\n}",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val testDists = testPerSampleId.map{ rr =>\n  val dists = model.clusterCenters.map{ x =>\n          fastSquaredDistance(x, Vectors.norm(x, 2.0), rr._2, Vectors.norm(rr._2, 2.0))\n                        }\n  new LabeledPoint(popsIdx(bPanel.value(rr._1)).toDouble, Vectors.dense(dists))\n}",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#### Now train the Random Forest"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark.mllib.tree.RandomForest",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val numClasses = 3\nval categoricalFeaturesInfo = Map[Int, Int]()\nval numTrees = 10 // Use more in practice.\nval featureSubsetStrategy = \"auto\" // Let the algorithm choose.\nval impurity = \"gini\"\nval maxDepth = 4\nval maxBins = 32",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val rfModel = RandomForest.trainClassifier(trainingDists, numClasses, categoricalFeaturesInfo,\n  numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val labelAndPreds = testDists.map { point =>\n  val prediction = rfModel.predict(point.features)\n  (point.label, prediction)\n}",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testDists.count()",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val confMat = labelAndPreds.collect.toList//.values\n    .groupBy(_._2).mapValues(_.map(_._1))\n    .mapValues(xs => (1 to 3).map( i => xs.count(_ == i-1)).toList)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "<table>\n<tr><td></td><td>#0</td><td>#1</td><td>#2</td></tr>\n{ for (popu <- confMat) yield\n  <tr><td>{popu._1}</td> { for (cnt <- popu._2) yield \n    <td>{cnt}</td>\n  }\n  </tr>\n}\n</table>",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val labelAndPredsOnTraining = trainingDists.map { point =>\n  val prediction = rfModel.predict(point.features)\n  (point.label, prediction)\n}",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val trainingErr = labelAndPredsOnTraining.filter(r => r._1 != r._2).count.toDouble / trainingDists.count()",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val confMatTraining = labelAndPredsOnTraining.collect.toList//.values\n    .groupBy(_._2).mapValues(_.map(_._1))\n    .mapValues(xs => (1 to 3).map( i => xs.count(_ == i-1)).toList)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "<table>\n<tr><td></td><td>#0</td><td>#1</td><td>#2</td></tr>\n{ for (popu <- confMatTraining) yield\n  <tr><td>{popu._1}</td> { for (cnt <- popu._2) yield \n    <td>{cnt}</td>\n  }\n  </tr>\n}\n</table>",
    "outputs" : [ ]
  } ],
  "nbformat" : 4
}